/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.ubox.mqtt_upush.mqttclient;

import static org.fusesource.hawtbuf.UTF8Buffer.utf8;

import java.util.concurrent.TimeUnit;

import javax.net.ssl.SSLContext;

import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ubox.mqtt_upush.utils.NetworkUtils;

public class FuseMQTTClientProvider implements MQTTClientProvider {
	
	private final static Logger logger = LoggerFactory.getLogger(FuseMQTTClientProvider.class);
	private final MQTT mqtt = new MQTT();
	private String url;
	private BlockingConnection connection;
	@Override
	public void connect(String host) throws Exception {
		mqtt.setHost(host);
		// shut off connect retry
		mqtt.setConnectAttemptsMax(0);
		mqtt.setUserName("upush");
		mqtt.setPassword("public");
		mqtt.setVersion("3.1.1");
		mqtt.setReconnectAttemptsMax(0);
		mqtt.setClientId("uboxSend"+NetworkUtils.getLocalv4IP());
		connection = mqtt.blockingConnection();
		connection.connect();
	}

	public void connect() throws Exception {
		mqtt.setHost(url);
		// shut off connect retry
		mqtt.setConnectAttemptsMax(0);
		mqtt.setUserName("upush");
		mqtt.setReconnectAttemptsMax(0);
		connection = mqtt.blockingConnection();
		connection.connect();
	}

	@Override
	public void disconnect() throws Exception {
		if (this.connection != null) {
			this.connection.disconnect();
		}
	}

	@Override
	public void publish(String topic, byte[] payload, int qos) throws Exception {
		publish(topic, payload, qos, false);
	}

	@Override
	public void publish(String topic, byte[] payload, int qos, boolean retained) throws Exception {
		connection.publish(topic, payload, QoS.values()[qos], retained);
	}

	@Override
	public void subscribe(String topic, int qos) throws Exception {
		Topic[] topics = { new Topic(utf8(topic), QoS.values()[qos]) };
		connection.subscribe(topics);
	}

	@Override
	public void unsubscribe(String topic) throws Exception {
		connection.unsubscribe(new String[] { topic });
	}

	@Override
	public byte[] receive(int timeout) throws Exception {
		byte[] result = null;
		Message message = connection.receive(timeout, TimeUnit.MILLISECONDS);
		if (message != null) {
			result = message.getPayload();
			message.ack();
		}
		return result;
	}

	@Override
	public void setSslContext(SSLContext sslContext) {
		mqtt.setSslContext(sslContext);
	}

	@Override
	public void setWillMessage(String string) {
		mqtt.setWillMessage(string);
	}

	@Override
	public void setWillTopic(String topic) {
		mqtt.setWillTopic(topic);
	}

	@Override
	public void setClientId(String clientId) {
		mqtt.setClientId(clientId);
	}

	@Override
	public void kill() throws Exception {
		connection.kill();
	}

	@Override
	public void setKeepAlive(int keepAlive) throws Exception {
		mqtt.setKeepAlive((short) keepAlive);
	}

	public String getUrl() {
		return url;
	}

	public void setUrl(String url) {
		this.url = url;
	}

	@Override
	public boolean isConnected() throws Exception {
		if (connection == null)
			return false;

		return connection.isConnected();
	}

	@Override
	public BlockingConnection getConnection() {
		return connection;
	}

	@Override
	public String getUsername() {
		// TODO Auto-generated method stub
		return mqtt.getUserName().toString();
	}
}
